// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package wsPool

import (
	"errors"
	"fmt"
	"gitee.com/rczweb/wsPool/util/grpool"
	"github.com/gorilla/websocket"
	"net/http"
	"sync"
	"time"
)

const (
	// Time allowed to read the next pong message from the peer.
	pongWait = 60 * time.Second

	// Send pings to peer with this period. Must be less than pongWait.
	pingPeriod = (pongWait * 9) / 10 //1 * time.Second//(pongWait * 9) / 10
	//var pingPeriod =27* time.Second //1 * time.Second//(pongWait * 9) / 10

	// Time allowed to write a message to the peer.
	writeWait = 30 * time.Second
	// Maximum message size allowed from peer.
	maxMessageSize = 1024 * 1024 * 20
)

var upgrader = websocket.Upgrader{
	//ReadBufferSize:  1024 * 1024,
	//WriteBufferSize: 1024 * 1024,
	// 默认允许WebSocket请求跨域，权限控制可以由业务层自己负责，灵活度更高
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

/*连接参数结构体*/
type Config struct {
	Id        string   //标识连接的名称
	Type      string   //连接类型或path
	Channel   []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
	Goroutine int      //每个连接开启的go程数里 默认为20
}

type RuntimeInfo struct {
	Id              string //标识连接的名称
	Type            string //连接类型或path
	Ip              string
	Channel         []string  //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
	OpenTime        time.Time //连接打开时间
	LastReceiveTime time.Time //最后一次接收到数据的时间
	LastSendTime    time.Time //最后一次发送数据的时间
}

/*//接收消息结构
messageType=1 为string
messageType=2 为[]byte*/
type Message struct {
	MsgType int
	Message []byte
	MsgTime time.Time
}

// Client is a middleman between the websocket connection and the hub.
type Client struct {
	hub *hub
	// The websocket connection.
	conn            *websocket.Conn
	types           string    //连接类型或path
	openTime        time.Time //连接打开时间
	CloseTime       time.Time //连接断开的时间
	lastReceiveTime time.Time //最后一次接收到数据的时间
	lastSendTime    time.Time //最后一次发送数据的时间
	Id              string    //标识连接的名称
	mux             *sync.Mutex
	IsClose         chan bool //连接的状态。true为关闭
	channel         []string  //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道

	grpool   *grpool.Pool
	// Buffered channel of outbound messages.
	sendCh   chan *Message //发送消息的缓冲管首
	recvCh   chan *Message //接收消息的缓冲管首
	recvPing chan int      //收到ping的存储管道，方便回复pong处理
	sendPing chan int      //发送ping的存储管道，方便收到pong处理下次发ping
	onError          func(error)
	onOpen           func() //连接成功的回调
	onPing           func() //收到ping
	onPong           func() //收到pong
	onMessage        func(message ...*Message)
	onClose          func()
	pingPeriodTicker *time.Timer //定时发送ping的定时器
}

// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
	defer func() {
		fmt.Println("连接己关闭或者断开，正在清理对像")
		c.conn.Close()
		//触发连接关闭的事件回调
		c.onClose() //先执行完关闭回调，再请空所有的回调
		c.OnError(nil)
		c.OnOpen(nil)
		c.OnMessage(nil)
		c.OnClose(nil)
		c.OnPong(nil)
		c.OnPing(nil)
		close(c.recvPing)
		close(c.sendPing)
		c.grpool.Close()
		c.hub.RemoveClient(c)
		dump()
	}()
	for {
		select {
		case <-c.IsClose:
			return
		default:
			msgType, message, err := c.conn.ReadMessage()
			if err != nil {
				if websocket.IsUnexpectedCloseError(err,
					websocket.CloseAbnormalClosure,
					websocket.CloseGoingAway,
					websocket.CloseProtocolError,
					websocket.CloseUnsupportedData,
					websocket.CloseNoStatusReceived,
					websocket.CloseAbnormalClosure,
					websocket.CloseInvalidFramePayloadData,
					websocket.ClosePolicyViolation,
					websocket.CloseMessageTooBig,
					websocket.CloseMandatoryExtension,
					websocket.CloseInternalServerErr,
					websocket.CloseServiceRestart,
					websocket.CloseTryAgainLater,
					websocket.CloseTLSHandshake) {
					c.onError(errors.New("连接ID：" + c.Id + "ReadMessage Is Unexpected Close Error:" + err.Error()))
					//c.closeChan<-true;
					goto loop1
				}else{
					c.onError(errors.New("连接ID：" + c.Id + "ReadMessage other error:" + err.Error()))
				}
				goto loop1
			}
			c.conn.SetReadDeadline(time.Now().Add(pongWait))
			c.pingPeriodTicker.Reset(pingPeriod)
			c.lastReceiveTime = time.Now()
			//c.onMessage(&Message{MsgType: msgType, Message: message})
			c.readMessage(&Message{MsgType: msgType, Message: message,MsgTime: time.Now()})
		}
	}
loop1:
	c.Close()
}

// 读取消息写管道缓冲区
func (c *Client) readMessage(msg *Message) {
	timeout := time.NewTimer(time.Millisecond * 800)
	defer timeout.Stop()
	select {
	case <-c.IsClose:
		c.onError(errors.New("readMessage连接" + c.Id + ",连接己在关闭，不进行消息接收"))
		return
	case c.recvCh <- msg:
		return
	case <-timeout.C:
		c.onError(errors.New("recvCh 消息管道blocked,写入消息超时,管道长度：" + string(len(c.recvCh))))
		return
	}
}

// 单个连接接收消息
func (c *Client) recvMessage() {
	defer func() {
		dump()
	}()
loop:
	for {
		select {
		case <-c.IsClose:
			return
		case data, ok := <-c.recvCh:
			if !ok {
				break loop
			}

				// Add queued  messages to the current handle message func.
				n := len(c.recvCh)
				if n > 0 {
					msgList:=make([]*Message,0)
					msgList=append(msgList, data)
					for i := 0; i < n; i++ {
						d, ok := <-c.recvCh
						if !ok  {
							break
						}
						msgList=append(msgList, d)
					}
					c.grpool.Add(func() {
						c.onMessage(msgList...)
					})
				}else{
					c.grpool.Add(func() {
						c.onMessage(data)
					})
				}

		}
	}
}

// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
	defer func() {
		dump()
	}()
loop:
	for {
		select {
		case <-c.IsClose:
			return
		case d, ok := <-c.sendCh:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// The hub closed the channel.
				//说明管道己经关闭
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				//glog.Error("连接ID："+c.Id,"wsServer发送消息失败,一般是连接channel已经被关闭：(此处服务端会断开连接，使客户端能够感知进行重连)")
				goto loop1
			}

		/*	w, err := c.conn.NextWriter(d.MsgType)
			if err != nil {
				return
			}*/
			c.lastSendTime = time.Now()
			err:=c.conn.WriteMessage(d.MsgType,d.Message)
			//_, err = w.Write(d.Message)
			if err != nil {
				c.onError(errors.New("连接ID：" + c.Id + "写消息进写入IO错误！连接中断" + err.Error()))
				goto loop1
			}
			// Add queued  messages to the current websocket message.
			//同时控制消息的最大值
			n := len(c.sendCh)
			if n > 0 {
				for i := 0; i < n; i++ {
					data, ok := <-c.sendCh
					if !ok  {
						// The hub closed the channel.
						//说明管道己经关闭
						c.conn.WriteMessage(websocket.CloseMessage, []byte{})
						//glog.Error("连接ID："+c.Id,"wsServer发送消息失败,一般是连接channel已经被关闭：(此处服务端会断开连接，使客户端能够感知进行重连)")
						goto loop1
					}
					//_, err = w.Write(data.Message)
					err=c.conn.WriteMessage(data.MsgType,data.Message)
					if err != nil {
						c.onError(errors.New("连接ID：" + c.Id + "写上次连接未发送的消息消息进写入IO错误！连接中断" + err.Error()))
						return
					}
				}
			}

			//关闭写入io对象
			/*if err := w.Close(); err != nil {
				c.onError(errors.New("连接ID：" + c.Id + "关闭写入IO对象出错，连接中断" + err.Error()))
				goto loop1
			}*/

		case <-c.pingPeriodTicker.C: //定时发送ping
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				c.onError(errors.New("连接ID：" + c.Id + "关闭写入IO对象出错，连接中断" + err.Error()))
				goto loop1
			}
		case p, ok := <-c.recvPing: //回复pong
			if !ok {
				break loop
			}
			if p == 1 {
				c.conn.SetWriteDeadline(time.Now().Add(writeWait))
				if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
					c.onError(errors.New("回复客户端PongMessage出现异常:" + err.Error()))
					goto loop1
				}
			}
		}
	}
loop1:
	c.close()
}

func (c *Client) send(msg *Message) {

	timeout := time.NewTimer(time.Millisecond * 800)
	defer timeout.Stop()
	select {
	case <-c.IsClose:
		c.onError(errors.New("send连接" + c.Id + ",连接己在关闭，消息发送失败"))
		return
	case c.sendCh <- msg:
		return
	case <-timeout.C:
		c.onError(errors.New("sendCh消息管道blocked,写入消息超时,管道长度：" + string(len(c.sendCh))))
		return
	}
	//c.sendCh<-msg
}

func (c *Client) close() {
	c.mux.Lock()
	defer c.mux.Unlock()
	select {
	case <-c.IsClose:
		return
	default:
		close(c.IsClose)
	}
}
